Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added a bounded Async writer that prevents oom errors #174

Open
wants to merge 1 commit into
base: master
Choose a base branch
from

Conversation

Kaldie
Copy link

@Kaldie Kaldie commented Jul 6, 2021

Solves the issue reported in #172
Another solution was to bound the queue, however that would be very unbalanced if the size writes were not equal. The downside of this that the write needs to have a len(). However that

@Kaldie
Copy link
Author

Kaldie commented Jul 13, 2021

@mtth can you have a look?

Copy link
Owner

@mtth mtth left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Apologies for the late review @Kaldie; thanks for taking a stab at this. I mostly have some concerns about the locking scheme, please have a look at the inline comments.

if buffersize is None:
return AsyncWriter(consumer)
else:
return BoundedAsyncWriter(consumer, buffer_size=buffersize)
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

buffersize is an HDFS argument, better not to overload its use here. Let's introduce a separate argument instead.

@@ -31,6 +31,18 @@ def __init__(self, message, *args, **kwargs):
self.exception = kwargs.get("exception")


def wrapped_consumer(asyncWriter, data):
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's make this private and use Python naming conventions (recommend shortening asyncWriter to writer).

@@ -31,6 +31,18 @@ def __init__(self, message, *args, **kwargs):
self.exception = kwargs.get("exception")


def wrapped_consumer(asyncWriter, data):
"""Wrapped consumer that lets us get a child's exception."""
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This needs to be updated now that the function is top-level.

Comment on lines +189 to +190
except RuntimeError:
pass
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a big code smell. Is there a way to avoid this? Locks are best used via with, perhaps it can help us here.

"""A Bounded asynchronous publisher-consumer.

:param consumer: Function which takes a single generator as argument.
:param buffer_size: Number of entities that are buffered. When this number is exeeded,
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please stick to 80 chars per line throughout.

_logger.debug('Child terminated without errors.')
self._queue = None

def write(self, chunk):
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Much of the logic in this class is identical to the original writer, is there a way to consolidate?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants